from airflow.models.dag import DAG
from airflow.utils.dates import days_ago
import datetime
import pendulum

local_tz = pendulum.timezone("Asia/Shanghai")
from dagen.operators import SSHOperator

pt= '{{ {{ execution_date.in_tz('Asia/Shanghai').format('YYYYMMDD') }} }}'
schedule_interval="0 5 * * *"
dag = DAG(
**{
'dag_id': "cdp_user_profile_to_ck",
'catchup': False,
'start_date': datetime.datetime(2021, month=10, day=1, tzinfo=local_tz),
'schedule_interval': schedule_interval,
'default_args':
    {
        'owner': 'Avris',
        'depends_on_past': False,
        'retries': 2,
        'email': ['guanghu@tesla.com'],
        'email_on_failure': True,
        'email_on_retry': False
    }
}
)

cmd="/opt/bigdata/spark/current/bin/spark-submit --master yarn --class com.tesla.cdp.service.WriteClickHouse --deploy-mode client --name test-job --jars /home/hadoop/eden-dwh-cdp-tagging-1.0.jar,/opt/bigdata/jar/fastjson-1.2.78.jar,/opt/bigdata/jar/json4s-native_3-4.0.3.jar,/opt/bigdata/jar/config-1.3.3.jar,/opt/bigdata/jar/jcommander-1.72.jar,/opt/bigdata/jar/scala-logging_2.11-3.7.2.jar,/home/hadoop/eden-dwh-cdp-1.0/lib/spark-avro_2.12-3.0.1.jar,/home/hadoop/eden-dwh-cdp-1.0/lib/hudi-utilities-bundle_2.12-0.9.0.jar,/opt/bigdata/jar/clickhouse-jdbc-0.3.1.jar,/home/hadoop/eden-dwh-cdp-1.0/lib/eden-dwh-cdp-1.0.jar --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension --conf spark.hadoop.mapreduce.input.pathFilter.class=org.apache.hudi.hadoop.HoodieROTablePathFilter --conf spark.hadoop.mapreduce.input.pathFilter.class=org.apache.hadoop.fs.PathFilter --conf spark.yarn.queue=batch  /home/hadoop/eden-dwh-cdp-tagging-1.0.jar -job-name ads_cdp_ck_user_profile_insert -pt %s"%pt

ssh_hook_task = SSHOperator(
    dag=dag,
    **{
        'task_id': "cdp_user_profile_to_ck_task",
        'ssh_conn_id': 'eden_yarn_stg',
        'command': cmd
      }
)
ssh_hook_task
